package com.xl.competition.modul_b.task1

import org.apache.spark.sql.{Row, SparkSession}

import java.util.Properties

object test1 {
  def main(args: Array[String]): Unit = {

    //初始化SparkSession对象  用t1变量去接受
    val t1: SparkSession = SparkSession.builder()
      .master("local[*]") //本地模式运行  使用本地全部线程
      .appName("test1") //取名叫test1
      .enableHiveSupport() //在spark程序上开启hive的支持
      .config("hive.metastore.uris", "thrift://node2:9083") //连接hive元数据的地址
      .getOrCreate() //执行这句才会真正的创建SparkSession对象

    val p = new Properties() //创建一个Java配置对象 用变量p接受
    p.put("user", "root") //向配置对象中添加MySQL的用户信息
    p.put("password", "Abc123..") //向配置对象中添加MySQL的密码信息

    t1.read
      .jdbc("jdbc:mysql://node3:3306/shtd_store", "base_province", p) //用spark去MySQL中读取数据  传入需要连接的地址  表名  以及上面创建的Java配置对象
      .createTempView("temp") //用spark将读到的数据创建为一个临时视图 命名为temp

    //获取之前最大的id值
    val r: Row = t1.sql(
      """
        |select max(id)
        |from ods.base_province
        |""".stripMargin)
      .take(1)(0)

    //防止获取到的值为空 所以先让其值为零
    var t = 0L
    //判断一下是不是为空
    if (!r.isNullAt(0)) {
      //不是就取到原来ods层中这个表的最大id值
      t = r.getLong(0)
    }

    //用spark去执行这段sql 查询MySQL中这个表的数据的id大于刚刚查出来的id的值 将满足条件的值写到hive中的ods.base_province的20231113这个静态分区中。
    t1.sql(
      s"""
         |insert into ods.base_province partition (etl_date = '20231113')
         |select id,
         |       name,
         |       region_id,
         |       area_code,
         |       iso_code,
         |       iso_3166_2,
         |       substr(current_timestamp(),1,19) create_time
         |from temp
         | where id > $t
         |""".stripMargin)
  }
}
